use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::net::SocketAddr;
use std::ops::{Range, RangeInclusive};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use array2::Array2;
use async_bincode::tokio::{AsyncBincodeStream, AsyncDestination};
use dataflow_expression::{BinaryOperator as DfBinaryOperator, Dialect, Expr as DfExpr};
use futures_util::future::TryFutureExt;
use futures_util::stream::futures_unordered::FuturesUnordered;
use futures_util::stream::{StreamExt, TryStreamExt};
use futures_util::{future, ready, Stream};
use petgraph::graph::NodeIndex;
use proptest::arbitrary::Arbitrary;
use rand::prelude::IteratorRandom;
use readyset_data::{
    Bound, BoundedRange, Collation, DfType, DfValue, IntoBoundedRange, RangeBounds,
};
use readyset_errors::{
    internal, internal_err, rpc_err, unsupported, view_err, ReadySetError, ReadySetResult,
};
use readyset_sql::ast::{
    BinaryOperator, Column, ColumnConstraint, ColumnSpecification, ItemPlaceholder, Literal,
    Relation, SelectStatement, SqlIdentifier, SqlType,
};
use readyset_sql::TryFromDialect as _;
use readyset_sql_passes::anonymize::{Anonymize, Anonymizer};
use readyset_tracing::child_span;
use readyset_tracing::presampled::instrument_if_enabled;
use readyset_tracing::propagation::Instrumented;
use readyset_util::intervals::cmp_start_end;
use readyset_util::redacted::Sensitive;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio_tower::multiplex;
use tower::balance::p2c::Balance;
use tower::buffer::Buffer;
use tower::limit::concurrency::ConcurrencyLimit;
use tower::timeout::Timeout;
use tower_service::Service;
use tracing::{debug_span, error, trace};
use tracing_futures::Instrument;
use vec1::{vec1, Vec1};

pub(crate) mod results;

use self::results::{ResultIterator, Results};
use crate::{ReaderAddress, Tagged, Tagger};

type Transport = AsyncBincodeStream<
    tokio::net::TcpStream,
    Tagged<ReadReply>,
    Instrumented<Tagged<ReadQuery>>,
    AsyncDestination,
>;

/// Index of a key column as it exists in the underlying state. During a migration this will be
/// used throughout MIR. In steady state this will refer to the reader key columns.
pub type KeyColumnIdx = usize;

/// Index of a placeholder variable as it appears in the SQL query
pub type PlaceholderIdx = usize;

/// All the information necessary to create an (anonymous) view in the graph.
///
/// This structure is not used *directly* by any of the methods that actually create views in the
/// graph, but is used throughout the codebase as a uniquely identifiable stand-in for a
/// yet-to-be-created view
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ViewCreateRequest {
    /// The query itself, as provided by the user
    pub statement: SelectStatement,

    /// The schema search path to use to resolve table references within the changelist
    ///
    /// This is actually passed as [`recipe::changelist::ChangeList::schema_search_path`] when
    /// views are created.
    pub schema_search_path: Vec<SqlIdentifier>,
}

impl ViewCreateRequest {
    /// Initialize a new [`ViewCreateRequest`] with the given query and schema search path
    pub fn new(statement: SelectStatement, schema_search_path: Vec<SqlIdentifier>) -> Self {
        Self {
            statement,
            schema_search_path,
        }
    }

    /// Anonymize the statement and schema_search_path of the ViewCreateRequest in place
    pub fn to_anonymized_string(&self) -> String {
        let mut anon = self.clone();
        let mut anonymizer = Anonymizer::new();
        anon.statement.anonymize(&mut anonymizer);
        for id in anon.schema_search_path.iter_mut() {
            id.anonymize(&mut anonymizer);
        }

        format!("{anon:?}")
    }
}

/// Representation of how a key column in a [`View`] maps back to a placeholder in the original
/// query
#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ViewPlaceholder {
    /// This key column was generated by ReadySet, and has no mapping to the original query. This
    /// is the case, for example, for a "bogokey" column generated for unparameterized queries.
    Generated,

    /// This key column corresponds one-to-one to a placeholder in the original query. Includes the
    /// BinaryOperator used in the placeholder comparison.
    OneToOne(PlaceholderIdx, BinaryOperator),

    /// This key column corresponds to a double-ended "BETWEEN"-style range lookup in the original
    /// query, with the given placeholder indexes for the lower and upper bounds of the range
    /// respectively
    Between(PlaceholderIdx, PlaceholderIdx),

    /// This key column is the page number of a paginated query, which must be calculated by
    /// dividing the value for the `OFFSET` clause by the value for the `LIMIT` in the query
    PageNumber {
        /// The index of the placeholder for the `OFFSET` clause of the original query
        offset_placeholder: PlaceholderIdx,
        /// The `LIMIT` (page size) in the query
        limit: u64,
    },
}

#[derive(Debug)]
struct Endpoint {
    addr: SocketAddr,
    timeout: Duration,
}

/// Identifies the source base table column for a projected column
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnBase {
    /// The name of the column in the base table
    pub column: SqlIdentifier,
    /// The name of the base table for this column
    pub table: Relation,
    /// A list of constraints on the column
    pub constraints: Vec<ColumnConstraint>,
    /// If known, the PostgreSQL OID for the column's base table
    pub table_oid: Option<u32>,
    /// If known, the PostgreSQL `attnum` for the column
    pub attnum: Option<i16>,
    /// Original SQL type
    pub sql_type: SqlType,
}

impl ColumnBase {
    pub fn has_default(&self) -> bool {
        self.constraints
            .iter()
            .any(|c| matches!(c, ColumnConstraint::DefaultValue(_)))
    }

    pub fn is_not_null(&self) -> bool {
        self.constraints
            .iter()
            .any(|c| matches!(c, ColumnConstraint::NotNull))
    }
}

/// Combines the specification for a columns with its base name
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnSchema {
    /// The name of the column
    pub column: Column,
    /// The column's type
    pub column_type: DfType,
    /// If the column is an alias, this field represents its base column
    pub base: Option<ColumnBase>,
}

impl ColumnSchema {
    /// Create a new ColumnSchema from a ColumnSpecification representing a column directly in a
    /// base table with the given name.
    pub fn from_base(
        spec: ColumnSpecification,
        table: Relation,
        dialect: Dialect,
    ) -> ReadySetResult<Self> {
        let collation = spec
            .get_collation()
            .map(|name| Collation::get_or_default(dialect, name));
        Ok(Self {
            base: Some(ColumnBase {
                column: spec.column.name.clone(),
                table,
                constraints: spec.constraints,
                table_oid: None,
                attnum: None,
                sql_type: spec.sql_type.clone(),
            }),
            column: spec.column,
            column_type: DfType::from_sql_type(
                &spec.sql_type,
                dialect,
                |_| None, /* Custom types not allowed for inserts via the adapter */
                collation,
            )?,
        })
    }
}

/// A `ViewSchema` is used to describe the columns of a stored ReadySet
/// view as a vector of columns. The ViewSchema contains a vector with all
/// projected columns and a vector with columns returned to the client.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ViewSchema {
    /// The set of columns returned to the client when executing this query.
    returned_cols: Vec<ColumnSchema>,
    /// The set of columns projected at the noria flowgraph reader node.
    projected_cols: Vec<ColumnSchema>,
}

/// SchemaType is passed to most ViewSchema functions to select between the two
/// schemas contained in the ViewSchema struct.
pub enum SchemaType {
    /// Used to select the schema returned to the client when executing this
    /// query.
    ReturnedSchema,
    /// Used to select the schema projected at the noria flowgraph reader node.
    ProjectedSchema,
}

type InnerService = multiplex::Client<
    multiplex::MultiplexTransport<Transport, Tagger>,
    tokio_tower::Error<
        multiplex::MultiplexTransport<Transport, Tagger>,
        Instrumented<Tagged<ReadQuery>>,
    >,
    Instrumented<Tagged<ReadQuery>>,
>;

impl Service<()> for Endpoint {
    type Response = InnerService;
    type Error = tokio::io::Error;

    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, _: ()) -> Self::Future {
        let f = tokio::net::TcpStream::connect(self.addr);
        let timeout = self.timeout;
        Box::pin(async move {
            let s = tokio::time::timeout(timeout, f).await??;
            s.set_nodelay(true)?;
            let s = AsyncBincodeStream::from(s).for_async();
            let t = multiplex::MultiplexTransport::new(s, Tagger::default());
            Ok(multiplex::Client::with_error_handler(
                t,
                |e| error!(error = %e, "View server went away"),
            ))
        })
    }
}

impl ViewSchema {
    /// Create a ViewSchema from returned and projected column schema vectors
    pub fn new(returned_cols: Vec<ColumnSchema>, projected_cols: Vec<ColumnSchema>) -> ViewSchema {
        ViewSchema {
            returned_cols,
            projected_cols,
        }
    }

    /// Get the schema specified by the schema type
    pub fn schema(&self, schema_type: SchemaType) -> &[ColumnSchema] {
        match schema_type {
            SchemaType::ReturnedSchema => &self.returned_cols,
            SchemaType::ProjectedSchema => &self.projected_cols,
        }
    }

    /// Return a vector specifying the types of the columns for the requested indices
    pub fn col_types<I>(&self, indices: I, schema_type: SchemaType) -> ReadySetResult<Vec<&DfType>>
    where
        I: IntoIterator<Item = usize>,
    {
        let schema = self.schema(schema_type);
        indices
            .into_iter()
            .map(|i| schema.get(i).map(|c| &c.column_type))
            .collect::<Option<Vec<_>>>()
            .ok_or_else(|| internal_err!("Schema expects valid column indices"))
    }

    /// Convert the given iterator [`Columns`] to a `Vec` of [`ColumnSchema`]. The columns match if
    /// either the column name matches (the alias) or the real base name
    pub fn to_cols<'a, 'b, T>(
        &'a self,
        cols: T,
        schema_type: SchemaType,
    ) -> ReadySetResult<Vec<&'a ColumnSchema>>
    where
        T: IntoIterator<Item = &'b Column>,
    {
        let mut by_name = HashMap::new();
        let mut by_base_name = HashMap::new();
        for cs in self.schema(schema_type) {
            by_name.insert(&cs.column.name, cs);
            if let Some(base) = &cs.base {
                by_base_name.insert(&base.column, cs);
            }
        }

        cols.into_iter()
            .map(move |c| {
                by_name
                    .get(&c.name)
                    .or_else(|| by_base_name.get(&c.name))
                    .copied()
                    .ok_or_else(|| internal_err!("Column {} not found", c.display_unquoted()))
            })
            .collect()
    }

    /// Get the indices of the columns in the schema that correspond to the list of provided
    /// [`readyset_sql::ast::Column`]. The columns match if either the column name matches (the
    /// alias) or the real base name
    pub fn indices_for_cols<'a, T>(
        &self,
        cols: T,
        schema_type: SchemaType,
    ) -> ReadySetResult<Vec<usize>>
    where
        T: Iterator<Item = &'a Column>,
    {
        let schema = self.schema(schema_type);

        cols.map(|c| {
            schema.iter().position(|e| {
                e.column.name == c.name
                    || e.base.as_ref().map(|b| b.column == c.name).unwrap_or(false)
            })
        })
        .collect::<Option<Vec<_>>>()
        .ok_or_else(|| internal_err!("Schema expects all columns to be present"))
    }
}

impl ColumnSchema {
    /// Consume the schema, returning the type for the column
    pub fn into_type(self) -> DfType {
        self.column_type
    }
}

fn make_views_stream(addr: SocketAddr, timeout: Duration) -> Discover {
    // TODO: use whatever comes out of https://github.com/tower-rs/tower/issues/456 instead of
    // creating _all_ the connections every time.
    Box::pin(
        (0..crate::VIEW_POOL_SIZE)
            .map(|i| async move {
                let svc = Endpoint { addr, timeout }.call(()).await?;
                Ok(tower::discover::Change::Insert(i, svc))
            })
            .collect::<futures_util::stream::FuturesUnordered<_>>(),
    ) as Pin<Box<_>>
}

fn make_views_discover(addr: SocketAddr, timeout: Duration) -> Discover {
    make_views_stream(addr, timeout)
}

// Send bounds are needed due to https://github.com/rust-lang/rust/issues/55997
pub(crate) type Discover = Pin<
    Box<
        dyn Stream<Item = Result<tower::discover::Change<usize, InnerService>, tokio::io::Error>>
            + Send,
    >,
>;

pub(crate) type ViewRpc = Buffer<
    Timeout<ConcurrencyLimit<Balance<Discover, Instrumented<Tagged<ReadQuery>>>>>,
    Instrumented<Tagged<ReadQuery>>,
>;

/// Representation for a comparison predicate against a set of keys
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum KeyComparison {
    /// Look up exactly one key
    Equal(Vec1<DfValue>),

    /// Look up all keys within a range
    Range(BoundedRange<Vec1<DfValue>>),
}

#[allow(clippy::len_without_is_empty)] // can never be empty
impl KeyComparison {
    /// Attempt to construct a key comparison comparing the given key using the given binary
    /// operator
    pub fn from_key_and_operator(
        key: Vec<DfValue>,
        operator: BinaryOperator,
    ) -> ReadySetResult<Self> {
        use BinaryOperator::*;

        let key = Vec1::try_from(key).map_err(|_| ReadySetError::EmptyKey)?;
        let inner = match operator {
            Greater => key.range_from(),
            GreaterOrEqual => key.range_from_inclusive(),
            Less => key.range_to(),
            LessOrEqual => key.range_to_inclusive(),
            Equal => return Ok(key.into()),
            _ => unsupported!("Unsupported operator `{operator}` in key"),
        };
        Ok(Self::Range(inner))
    }

    /// Project a KeyComparison into an optional equality predicate, or return None if it's a range
    /// predicate. Handles both [`Equal`] and single-length [`Range`]s
    pub fn equal(&self) -> Option<&Vec1<DfValue>> {
        match self {
            KeyComparison::Equal(ref key) => Some(key),
            KeyComparison::Range((Bound::Included(ref key), Bound::Included(ref key2)))
                if key == key2 =>
            {
                Some(key)
            }
            _ => None,
        }
    }

    /// Convert a KeyComparison into an optional equality predicate, consuming the key comparison,
    /// or return None if it's a range predicate. Handles both [`Equal`] and single-length
    /// [`Range`]s
    pub fn into_equal(self) -> Option<Vec1<DfValue>> {
        match self {
            KeyComparison::Equal(key) => Some(key),
            KeyComparison::Range((Bound::Included(key), Bound::Included(ref key2)))
                if key == *key2 =>
            {
                Some(key)
            }
            _ => None,
        }
    }

    /// Project a KeyComparison into an optional range predicate, or return None if it's a range
    /// predicate
    pub fn range(&self) -> Option<&BoundedRange<Vec1<DfValue>>> {
        match self {
            KeyComparison::Range(ref range) => Some(range),
            _ => None,
        }
    }

    /// Build a [`KeyComparison`] from a range of keys.
    ///
    /// If the range has length 1 (both the ends are inclusive bounds on the same value) will return
    /// [`KeyComparison::Equal`].
    pub fn from_range<R>(range: &R) -> Self
    where
        R: RangeBounds<Vec1<DfValue>>,
    {
        match (range.start_bound(), range.end_bound()) {
            (Bound::Included(key1), Bound::Included(key2)) if key1 == key2 => {
                KeyComparison::Equal(key1.clone())
            }
            (start, end) => KeyComparison::Range((start.cloned(), end.cloned())),
        }
    }

    /// Convert the [`KeyComparison`] into a [`KeyComparison::Range`].
    ///
    /// If self was an Equal comparison, return a Range comparison with inclusive upper and lower
    /// bounds equal to the Equal key.
    pub fn into_range(self) -> KeyComparison {
        match self {
            KeyComparison::Range(_) => self,
            KeyComparison::Equal(key) => {
                KeyComparison::Range((Bound::Included(key.clone()), Bound::Included(key)))
            }
        }
    }

    /// Returns true if this KeyComparison represents a range where the upper bound is less than the
    /// lower bound
    ///
    /// # Examples
    ///
    /// ```
    /// use readyset_client::KeyComparison;
    /// use readyset_data::DfValue;
    /// use vec1::vec1;
    ///
    /// let not_reversed =
    ///     KeyComparison::from_range(&(vec1![DfValue::from(0)]..=vec1![DfValue::from(1)]));
    /// assert!(!not_reversed.is_reversed_range());
    ///
    /// let reversed = KeyComparison::from_range(&(vec1![DfValue::from(1)]..=vec1![DfValue::from(0)]));
    /// assert!(reversed.is_reversed_range());
    /// ```
    pub fn is_reversed_range(&self) -> bool {
        cmp_start_end(
            <Self as RangeBounds<Vec1<DfValue>>>::start_bound(self).into(),
            <Self as RangeBounds<Vec1<DfValue>>>::end_bound(self).into(),
        ) == Ordering::Greater
    }

    /// Returns the shard key(s) that the given cell in this [`KeyComparison`] must target, given
    /// the total number of shards.
    ///
    /// ## Invariants
    /// * the `key_idx` must be in the `key`s.
    /// * the `key`s should have at least one element.
    pub fn shard_keys_at(&self, key_idx: usize, num_shards: usize) -> Vec<usize> {
        match self {
            KeyComparison::Equal(key) => vec![crate::shard_by(&key[key_idx], num_shards)],
            // Since we currently implement hash-based sharding, any non-point query must target all
            // shards. This restriction could be lifted in the future by implementing (perhaps
            // optional) range-based sharding, likely with rebalancing. See Guillote-Blouin, J.
            // (2020) Implementing Range Queries and Write Policies in a Partially-Materialized
            // Data-Flow [Unpublished Master's thesis]. Harvard University S 2.4
            _ => (0..num_shards).collect(),
        }
    }

    /// Returns the shard key(s) that the first column in this [`KeyComparison`] must target, given
    /// the total number of shards
    pub fn shard_keys(&self, num_shards: usize) -> Vec<usize> {
        self.shard_keys_at(0, num_shards)
    }

    /// Returns the length of the key this [`KeyComparison`] is comparing against.
    ///
    /// Since all KeyComparisons wrap a [`Vec1`], this function will never return 0
    pub fn len(&self) -> usize {
        match self {
            Self::Equal(key) => key.len(),
            Self::Range((
                Bound::Included(ref start) | Bound::Excluded(ref start),
                Bound::Included(ref end) | Bound::Excluded(ref end),
            )) => {
                debug_assert_eq!(start.len(), end.len());
                start.len()
            }
        }
    }

    /// Returns true if the given `key` is covered by this [`KeyComparison`].
    ///
    /// Concretely, this is the case if the [`KeyComparison`] is either an [equality][] match on
    /// `key`, or a [range][] match that covers `key`.
    ///
    /// [equality]: KeyComparison::equal
    /// [range]: KeyComparison::range
    ///
    /// # Examples
    ///
    /// Equal keys contain themselves and only themselves:
    ///
    /// ```rust
    /// use readyset_client::KeyComparison;
    /// use readyset_data::DfValue;
    /// use vec1::vec1;
    ///
    /// let key = KeyComparison::Equal(vec1![1.into(), 2.into()]);
    /// assert!(key.contains(&[1.into(), 2.into()]));
    /// assert!(!key.contains(&[1.into(), 3.into()]));
    /// ```
    ///
    /// Range keys contain anything in the range, comparing lexicographically
    ///
    /// ```rust
    /// use readyset_client::KeyComparison;
    /// use readyset_data::Bound::*;
    /// use readyset_data::DfValue;
    /// use vec1::vec1;
    ///
    /// let key = KeyComparison::Range((
    ///     Included(vec1![1.into(), 2.into()]),
    ///     Excluded(vec1![1.into(), 5.into()]),
    /// ));
    ///
    /// assert!(key.contains(&[1.into(), 3.into()]));
    /// assert!(!key.contains(&[2.into(), 2.into()]));
    /// ```
    pub fn contains<'a, I>(&'a self, key: I) -> bool
    where
        I: IntoIterator<Item = &'a DfValue> + Clone,
    {
        match self {
            Self::Equal(equal) => key.into_iter().cmp(equal.iter()) == Ordering::Equal,
            Self::Range((lower, upper)) => {
                (match lower {
                    Bound::Included(start) => key.clone().into_iter().cmp(start.iter()).is_ge(),
                    Bound::Excluded(start) => key.clone().into_iter().cmp(start.iter()).is_gt(),
                }) && (match upper {
                    Bound::Included(end) => key.into_iter().cmp(end.iter()).is_le(),
                    Bound::Excluded(end) => key.into_iter().cmp(end.iter()).is_lt(),
                })
            }
        }
    }

    /// Returns true if this [`KeyComparison`] is an equality comparison predicate.
    pub fn is_equal(&self) -> bool {
        matches!(self, KeyComparison::Equal(_))
    }

    /// Returns `true` if this [`KeyComparison`] is a [`Range`] key.
    ///
    /// [`Range`]: KeyComparison::Range
    pub fn is_range(&self) -> bool {
        matches!(self, KeyComparison::Range(..))
    }

    /// Construct a new [`KeyComparison`] of the same kind as `self` by mapping the given function
    /// over either the equal key or one or both endpoints of the range key
    #[must_use]
    pub fn map_endpoints<F>(self, mut f: F) -> Self
    where
        F: FnMut(Vec1<DfValue>) -> Vec1<DfValue>,
    {
        match self {
            KeyComparison::Equal(k) => KeyComparison::Equal(f(k)),
            KeyComparison::Range((lower, upper)) => {
                KeyComparison::Range((lower.map(&mut f), upper.map(&mut f)))
            }
        }
    }
}

impl PartialEq for KeyComparison {
    fn eq(&self, other: &Self) -> bool {
        use KeyComparison::*;
        match (self, other) {
            (Equal(k1), Equal(k2)) => k1 == k2,
            (Range(r1), Range(r2)) => r1 == r2,
            (Equal(eq), Range((Bound::Included(k1), Bound::Included(k2)))) => eq == k1 && k1 == k2,
            (Equal(_), Range(_)) => false,
            (Range(_), Equal(_)) => other == self,
        }
    }
}

impl Eq for KeyComparison {}

impl Hash for KeyComparison {
    fn hash<H: Hasher>(&self, state: &mut H) {
        match self {
            Self::Equal(k) => k.hash(state),
            Self::Range((Bound::Included(k1), Bound::Included(k2))) if k1 == k2 => k1.hash(state),
            Self::Range(r) => r.hash(state),
        }
    }
}

impl TryFrom<Vec<DfValue>> for KeyComparison {
    type Error = vec1::Size0Error;

    /// Converts to a [`KeyComparison::Equal`]. Returns an error if the input vector is empty
    fn try_from(value: Vec<DfValue>) -> Result<Self, Self::Error> {
        Ok(Vec1::try_from(value)?.into())
    }
}

impl From<Vec1<DfValue>> for KeyComparison {
    /// Converts to a [`KeyComparison::Equal`]
    fn from(key: Vec1<DfValue>) -> Self {
        KeyComparison::Equal(key)
    }
}

impl TryFrom<RangeInclusive<Vec<DfValue>>> for KeyComparison {
    type Error = vec1::Size0Error;

    fn try_from(range: RangeInclusive<Vec<DfValue>>) -> Result<Self, Self::Error> {
        let (start, end) = range.into_inner();
        Ok(KeyComparison::Range((
            Bound::Included(Vec1::try_from(start)?),
            Bound::Included(Vec1::try_from(end)?),
        )))
    }
}

impl From<Range<Vec1<DfValue>>> for KeyComparison {
    fn from(range: Range<Vec1<DfValue>>) -> Self {
        KeyComparison::Range((Bound::Included(range.start), Bound::Excluded(range.end)))
    }
}

impl TryFrom<BoundedRange<Vec<DfValue>>> for KeyComparison {
    type Error = vec1::Size0Error;

    /// Converts to a [`KeyComparison::Range`]
    fn try_from((lower, upper): BoundedRange<Vec<DfValue>>) -> Result<Self, Self::Error> {
        let convert_bound = |bound| match bound {
            Bound::Included(x) => Ok(Bound::Included(Vec1::try_from(x)?)),
            Bound::Excluded(x) => Ok(Bound::Excluded(Vec1::try_from(x)?)),
        };
        Ok(Self::Range((convert_bound(lower)?, convert_bound(upper)?)))
    }
}

impl RangeBounds<Vec1<DfValue>> for KeyComparison {
    fn start_bound(&self) -> Bound<&Vec1<DfValue>> {
        use Bound::*;
        use KeyComparison::*;
        match self {
            Equal(ref key) => Included(key),
            Range(bp) => bp.start_bound(),
        }
    }

    fn end_bound(&self) -> Bound<&Vec1<DfValue>> {
        use Bound::*;
        use KeyComparison::*;
        match self {
            Equal(ref key) => Included(key),
            Range(bp) => bp.end_bound(),
        }
    }
}

impl RangeBounds<Vec<DfValue>> for KeyComparison {
    fn start_bound(&self) -> Bound<&Vec<DfValue>> {
        self.start_bound().map(Vec1::as_vec)
    }

    fn end_bound(&self) -> Bound<&Vec<DfValue>> {
        self.end_bound().map(Vec1::as_vec)
    }
}

impl RangeBounds<Vec1<DfValue>> for &KeyComparison {
    fn start_bound(&self) -> Bound<&Vec1<DfValue>> {
        use KeyComparison::*;
        match self {
            Equal(ref key) => Bound::Included(key),
            Range(bp) => bp.start_bound(),
        }
    }

    fn end_bound(&self) -> Bound<&Vec1<DfValue>> {
        use KeyComparison::*;
        match self {
            Equal(ref key) => Bound::Included(key),
            Range(bp) => bp.end_bound(),
        }
    }
}

impl RangeBounds<Vec<DfValue>> for &KeyComparison {
    fn start_bound(&self) -> Bound<&Vec<DfValue>> {
        (**self).start_bound()
    }

    fn end_bound(&self) -> Bound<&Vec<DfValue>> {
        (**self).end_bound()
    }
}

impl std::ops::RangeBounds<Vec<DfValue>> for KeyComparison {
    fn start_bound(&self) -> std::ops::Bound<&Vec<DfValue>> {
        RangeBounds::start_bound(self).into()
    }

    fn end_bound(&self) -> std::ops::Bound<&Vec<DfValue>> {
        RangeBounds::end_bound(self).into()
    }
}

impl Arbitrary for KeyComparison {
    type Parameters = ();
    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
        use proptest::arbitrary::any_with;
        use proptest::prop_oneof;
        use proptest::strategy::Strategy;

        let bound = || {
            any_with::<Bound<Vec<DfValue>>>(((1..100).into(), None)).prop_map(|bound| {
                #[allow(clippy::unwrap_used)]
                // This is only used for testing, so we allow calling `unwrap()`, and because we
                // know we are generating vectors of length 1 and beyond.
                bound.map(|k| Vec1::try_from_vec(k).unwrap())
            })
        };

        prop_oneof![
            any_with::<Vec<DfValue>>(((1..100).into(), None)).prop_map(|k| {
                #[allow(clippy::unwrap_used)]
                // This is only used for testing, so we allow calling `unwrap()`, and because we
                // know we are generating vectors of length 1 and beyond.
                KeyComparison::try_from(k).unwrap()
            }),
            (bound(), bound()).prop_map(KeyComparison::Range)
        ]
        .boxed()
    }

    type Strategy = proptest::strategy::BoxedStrategy<KeyComparison>;
}

#[derive(Serialize, Deserialize, Debug)]
#[allow(clippy::large_enum_variant)] // TODO: benchmark cost/benefit of boxing ViewQuery
pub enum ReadQuery {
    /// Read from a leaf view
    Normal {
        /// Where to read from
        target: ReaderAddress,
        /// View query to run
        query: ViewQuery,
    },
    /// Read the size of a leaf view
    Size {
        /// Where to read from
        target: ReaderAddress,
    },
    /// Read all keys from a leaf view (for debugging)
    /// TODO(alex): queries with this value are not totally implemented, and might not actually
    /// work
    Keys {
        /// Where to read from
        target: ReaderAddress,
    },
}

/// The result of a lookup to a view.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum LookupResult<D> {
    /// The view query was executed in non-blocking mode and resulted in a cache miss.
    NonBlockingMiss,
    /// The results of the view query lookup.
    Results(Vec<D>, ReadReplyStats),
}

impl<D> LookupResult<D> {
    /// Maps a set of lookup results from Vec<D> to Vec<U>.
    pub fn map_results<U, F>(self, mut f: F) -> LookupResult<U>
    where
        F: FnMut(D, &ReadReplyStats) -> U,
    {
        match self {
            Self::NonBlockingMiss => LookupResult::NonBlockingMiss,
            Self::Results(d, stats) => {
                LookupResult::Results(d.into_iter().map(|d| f(d, &stats)).collect(), stats)
            }
        }
    }

    /// Converts a lookup result into the inner `Results` type.
    pub fn into_results(self) -> Option<Vec<D>> {
        if let Self::Results(v, _) = self {
            Some(v)
        } else {
            None
        }
    }
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq, Clone)]
pub struct ReadReplyStats {
    /// The count of cache misses which have occurred
    pub cache_misses: u64,
}

impl ReadReplyStats {
    /// Creates a new [`ReadReplyStats`]
    #[must_use]
    pub fn merge(&self, other: &Self) -> Self {
        Self {
            cache_misses: self.cache_misses + other.cache_misses,
        }
    }
}

#[derive(Serialize, Deserialize, Debug)]
pub enum ReadReply<D = ReadReplyBatch> {
    /// A reply to a normal lookup request
    Normal(ReadySetResult<LookupResult<D>>),
    /// Read size of view
    Size(usize),
    // Read keys of view
    Keys(Vec<Vec<DfValue>>),
}

impl<D> ReadReply<D> {
    /// Convert this [`ReadReply`] into a [`ReadReply::Normal`], consuming self
    pub fn into_normal(self) -> Option<ReadySetResult<LookupResult<D>>> {
        if let Self::Normal(v) = self {
            Some(v)
        } else {
            None
        }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReaderHandleBuilder {
    pub name: Relation,

    pub node: NodeIndex,
    pub columns: Arc<[SqlIdentifier]>,
    pub schema: Option<ViewSchema>,

    /// replica -> shard index -> addr (if running)
    pub replica_shard_addrs: Array2<Option<SocketAddr>>,

    /// (view_placeholder, key_column_index) pairs according to their mapping. Contains exactly one
    /// entry for each key column at the reader.
    pub key_mapping: Vec<(ViewPlaceholder, KeyColumnIdx)>,

    /// The amount of time before a view request RPC is terminated.
    pub view_request_timeout: Duration,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
/// Builds a [`ReusedReaderHandle`]
pub struct ReusedReaderHandleBuilder {
    /// The builder for the [`ReaderHandle`] that is being reused
    pub builder: ReaderHandleBuilder,
    /// Remapping from [`PlaceholderIdx`]s in the executed query to inlined [`Literal`]s in the
    /// cached query.
    ///
    /// This view can only be used if the values in this map match the values passed on execution
    /// for the given placeholders.
    pub required_values: HashMap<PlaceholderIdx, Literal>,
    /// Remapping from [`PlaceholderIdx`]s in the migrated query to [`Literal`]s in the executed
    /// query.
    ///
    /// This remapping is applied to [`ViewBuilder::key_mapping`] when building keys.
    pub key_remapping: HashMap<PlaceholderIdx, Literal>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
/// A single [`ReaderHandleBuilder`] or a collection of [`ReusedReaderHandleBuilder`]s
pub enum ViewBuilder {
    /// A [`ViewBuilder`]
    Single(ReaderHandleBuilder),
    /// A collection of [`ReusedReaderHandleBuilder`]s
    MultipleReused(Vec1<ReusedReaderHandleBuilder>),
}

impl ReaderHandleBuilder {
    /// Build a [`ReaderHandle`] out of a [`ReaderHandleBuilder`].
    ///
    /// If `replica` is specified, this selects the reader replica with that index, returning an
    /// error if the index is out of bounds. Otherwise, a replica is selected at random
    pub async fn build(
        &self,
        replica: Option<usize>,
        rpcs: Arc<Mutex<HashMap<(SocketAddr, usize), ViewRpc>>>,
    ) -> ReadySetResult<ReaderHandle> {
        let node = self.node;

        let shards = match replica {
            Some(replica) => self.replica_shard_addrs.get(replica),
            None if self.replica_shard_addrs.num_rows() == 1 => Some(&self.replica_shard_addrs[0]),
            None => self
                .replica_shard_addrs
                .rows()
                .filter(|a| a.iter().all(|addr| addr.is_some()))
                .choose(&mut rand::rng()),
        }
        .ok_or_else(|| {
            if replica.is_none() && self.replica_shard_addrs.num_rows() != 1 {
                ReadySetError::ReaderReplicaNotRunning { replica: 0, node }
            } else {
                ReadySetError::ViewReplicaOutOfBounds {
                    replica: replica.unwrap_or(0),
                    view_name: self.name.clone().display_unquoted().to_string(),
                    num_replicas: self.replica_shard_addrs.num_rows(),
                }
            }
        })?;

        let columns = self.columns.clone();
        let schema = self.schema.clone();
        let key_mapping = self.key_mapping.clone();

        let mut addrs = Vec::with_capacity(shards.len());
        let mut conns = Vec::with_capacity(shards.len());

        for (shardi, shard_addr) in shards.iter().enumerate() {
            use std::collections::hash_map::Entry;

            let Some(shard_addr) = *shard_addr else {
                return Err(ReadySetError::ReaderReplicaNotRunning {
                    replica: replica.unwrap_or(0),
                    node,
                });
            };

            addrs.push(shard_addr);

            // one entry per shard so that we can send sharded requests in parallel even if
            // they happen to be targeting the same machine.
            let mut rpcs = rpcs.lock().await;
            #[allow(clippy::significant_drop_in_scrutinee)]
            let s = match rpcs.entry((shard_addr, shardi)) {
                Entry::Occupied(e) => e.get().clone(),
                Entry::Vacant(h) => {
                    // TODO: maybe always use the same local port?
                    let (c, w) = Buffer::pair(
                        Timeout::new(
                            ConcurrencyLimit::new(
                                Balance::new(make_views_discover(
                                    shard_addr,
                                    self.view_request_timeout,
                                )),
                                crate::PENDING_LIMIT,
                            ),
                            self.view_request_timeout,
                        ),
                        crate::BUFFER_TO_POOL,
                    );
                    tokio::spawn(w.instrument(debug_span!(
                        "view_worker",
                        addr = %shard_addr,
                        shard = shardi
                    )));
                    h.insert(c.clone());
                    c
                }
            };
            conns.push(s);
        }

        Ok(ReaderHandle {
            name: self.name.clone(),
            node,
            schema,
            columns,
            key_mapping,
            shard_addrs: addrs,
            shards: Vec1::try_from_vec(conns).map_err(|_| {
                internal_err!(
                    "cannot create view {} without shards",
                    self.name.display_unquoted()
                )
            })?,
        })
    }
}

impl ViewBuilder {
    /// Build a `View` from `ViewBuilder`. Wraps ReaderHandleBuilder::build().
    pub async fn build(
        &self,
        replica: Option<usize>,
        rpcs: Arc<Mutex<HashMap<(SocketAddr, usize), ViewRpc>>>,
    ) -> ReadySetResult<View> {
        match self {
            ViewBuilder::Single(builder) => Ok(View::Single(builder.build(replica, rpcs).await?)),
            ViewBuilder::MultipleReused(builders) => {
                let mut handles = Vec::with_capacity(builders.len());
                for ReusedReaderHandleBuilder {
                    builder,
                    key_remapping,
                    required_values,
                } in builders
                {
                    let reader_handle = builder.build(replica, rpcs.clone()).await?;
                    handles.push(ReusedReaderHandle {
                        reader_handle,
                        key_remapping: key_remapping.clone(),
                        required_values: required_values.clone(),
                    })
                }
                Ok(View::MultipleReused(handles.try_into().unwrap()))
            }
        }
    }
}

/// A `ReaderHandle` is used to query previously defined external reader nodes.
///
/// Note that if you create multiple `ReaderHandle`s from a single `ReadySetHandle`, they may
/// share connections to the Readyset workers.
#[derive(Clone)]
pub struct ReaderHandle {
    name: Relation,
    node: NodeIndex,
    columns: Arc<[SqlIdentifier]>,
    schema: Option<ViewSchema>,
    /// (view_placeholder, key_column_index) pairs according to their mapping. Contains exactly
    /// one entry for each key column at the reader.
    key_mapping: Vec<(ViewPlaceholder, KeyColumnIdx)>,
    shards: Vec1<ViewRpc>,
    shard_addrs: Vec<SocketAddr>,
}

impl fmt::Debug for ReaderHandle {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ReaderHandle")
            .field("node", &self.node)
            .field("columns", &self.columns)
            .field("shard_addrs", &self.shard_addrs)
            .finish_non_exhaustive()
    }
}

/// A [`ReaderHandle`] to a cached query that is different than the query being executed.
///
/// A query can reuse a [`ReaderHandle`] for a cached query if the cached query is equivalent for
/// some set of parameter values passed at execution.
#[derive(Debug, Clone)]
pub struct ReusedReaderHandle {
    /// The [`ReaderHandle`] that reuses the reader of another query for this query.
    reader_handle: ReaderHandle,
    /// Remapping from [`PlaceholderIdx`]s in the executed query to inlined [`Literal`]s in the
    /// cached query.
    ///
    /// This view can only be used if the values in this map match the values passed on execution
    /// for the given placeholders.
    required_values: HashMap<PlaceholderIdx, Literal>,
    // Remapping from [`PlaceholderIdx`]s in the cached query to [`Literal`]s in the executed
    // query. This remapping is applied to [`key_mapping`] when building keys.
    key_remapping: HashMap<PlaceholderIdx, Literal>,
}

/// A wrapper to hold either a [`ReaderHandle`] or collection of [`ReusedReaderHandle`]s.
#[derive(Debug, Clone)]
pub enum View {
    /// A [`ReaderHandle`] for a cached query.
    Single(ReaderHandle),
    /// A collection of [`ReusedReaderHandle`]s that may satisfy a query for a given set of
    /// placeholders passed at execution.
    ///
    /// There may be multiple handles because a single query may map to multiple caches for
    /// different inlined values (e.g., `x = ?` maps to `x = 1` and `x = 2`)
    MultipleReused(Vec1<ReusedReaderHandle>),
}

/// A read query to be run against a [`View`].
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ViewQuery {
    /// Key comparisons to read with.
    pub key_comparisons: Vec<KeyComparison>,
    /// Whether the query should block.
    pub block: bool,
    /// Expression to use to filter values after they're returned from the underlying reader.
    ///
    /// This expression will be evaluated on each of the rows returned from the reader, and any
    /// rows for which it evaluates to a non-[truthy][] value will be omitted from the result set.
    ///
    /// [truthy]: DfValue::is_truthy
    pub filter: Option<DfExpr>,
    /// An optional limit to the number of values to return
    pub limit: Option<usize>,
    /// An optional offset to skip the given number of rows from the beginning of the result set
    pub offset: Option<usize>,
}

impl From<(Vec<KeyComparison>, bool)> for ViewQuery {
    fn from((key_comparisons, block): (Vec<KeyComparison>, bool)) -> Self {
        Self {
            key_comparisons,
            block,
            filter: None,
            limit: None,
            offset: None,
        }
    }
}

impl Service<ViewQuery> for ReaderHandle {
    type Response = LookupResult<Results>;
    type Error = ReadySetError;

    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        for s in &mut self.shards {
            let ni = self.node;
            ready!(s.poll_ready(cx))
                .map_err(rpc_err!("<View as Service<ViewQuery>>::poll_ready"))
                .map_err(|e| view_err(ni, e))?
        }
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, mut query: ViewQuery) -> Self::Future {
        let ni = self.node;
        let span = child_span!(
            INFO,
            "view-request",
            key_comparisons = ?Sensitive(&query.key_comparisons),
            node = self.node.index()
        );

        if self.shards.len() == 1 {
            let request = span.in_scope(|| {
                Instrumented::from(Tagged::from(ReadQuery::Normal {
                    target: ReaderAddress {
                        node: self.node,
                        name: self.name.clone(),
                        shard: 0,
                    },
                    query,
                }))
            });

            trace!("submit request");

            return Box::pin(future::Either::<_, Self::Future>::Left(
                self.shards
                    .first_mut()
                    .call(request)
                    .map_err(rpc_err!(
                        "<View as Service<ViewQuery>>::call",
                        multiplex::MultiplexTransport<Transport, Tagger>,
                        Instrumented<Tagged<ReadQuery>>,
                    ))
                    .and_then(move |reply| {
                        let future = async move {
                            reply
                                .v
                                .into_normal()
                                .ok_or_else(|| {
                                    internal_err!("Unexpected response type from reader service")
                                })?
                                .map(|l| {
                                    l.map_results(|rows, stats| {
                                        Results::with_stats(rows.into(), stats.clone())
                                    })
                                })
                        };
                        instrument_if_enabled(future, span)
                    })
                    .map_err(move |e| view_err(ni, e)),
            ));
        }

        span.in_scope(|| trace!("shard request"));
        let mut shard_queries = vec![Vec::new(); self.shards.len()];
        for comparison in query.key_comparisons.drain(..) {
            for shard in comparison.shard_keys(self.shards.len()) {
                shard_queries[shard].push(comparison.clone());
            }
        }

        let node = self.node;
        let name = self.name.clone();
        Box::pin(future::Either::<Self::Future, _>::Right(
            self.shards
                .iter_mut()
                .enumerate()
                .zip(shard_queries)
                .filter_map(|((shardi, shard), shard_queries)| {
                    if shard_queries.is_empty() {
                        // poll_ready reserves a sender slot which we have to release
                        // we do that by dropping the old handle and replacing it with a clone
                        // https://github.com/tokio-rs/tokio/issues/898
                        *shard = shard.clone();
                        None
                    } else {
                        Some(((shardi, shard), shard_queries))
                    }
                })
                .map(move |((shardi, shard), shard_queries)| {
                    // The double-enter here is used to crate an inner span for the "view-shard"
                    // portion of the request, and ensure that its parent is the "view-request"
                    // span.
                    let _guard = tracing::Span::enter(&span);
                    let span = child_span!(INFO, "view-shard", shardi);
                    let _guard = tracing::Span::enter(&span);

                    // NOTE: Sharded views can't actually work with aggregates, order by, limit or
                    // offset
                    let request = Instrumented::from(Tagged::from(ReadQuery::Normal {
                        target: ReaderAddress {
                            node,
                            name: name.clone(),
                            shard: shardi,
                        },
                        query: ViewQuery {
                            key_comparisons: shard_queries,
                            block: query.block,
                            filter: query.filter.clone(),
                            limit: query.limit,
                            offset: query.offset,
                        },
                    }));

                    trace!("submit request shard");

                    shard
                        .call(request)
                        .map_err(rpc_err!("<View as Service<ViewQuery>>::call"))
                        .and_then(|reply| async move {
                            reply.v.into_normal().ok_or_else(|| {
                                internal_err!("Unexpected response type from reader service")
                            })?
                        })
                        .map_err(move |e| view_err(ni, e))
                })
                .collect::<FuturesUnordered<_>>()
                .try_collect::<Vec<LookupResult<ReadReplyBatch>>>()
                .map_ok(move |e| {
                    // Flatten this to a single LookupResult<Results>.
                    e.into_iter().fold(
                        LookupResult::Results(Vec::new(), ReadReplyStats::default()),
                        |mut acc, x| {
                            if let LookupResult::Results(d, _) = &mut acc {
                                match x {
                                    LookupResult::NonBlockingMiss => {
                                        return LookupResult::NonBlockingMiss;
                                    }
                                    LookupResult::Results(u, stats) => {
                                        d.extend(u.into_iter().map(|rows| {
                                            Results::with_stats(rows.into(), stats.clone())
                                        }));
                                    }
                                }
                            }
                            acc
                        },
                    )
                }),
        ))
    }
}

#[allow(clippy::len_without_is_empty)]
impl ReaderHandle {
    /// Get the list of columns in this view.
    pub fn columns(&self) -> &[SqlIdentifier] {
        &self.columns
    }

    /// Get a slice of the columns.
    pub fn column_slice(&self) -> Arc<[SqlIdentifier]> {
        Arc::clone(&self.columns)
    }

    /// Get the schema definition of this view.
    pub fn schema(&self) -> Option<&ViewSchema> {
        self.schema.as_ref()
    }

    /// Get the NodeIndex of the dataflow node that this
    /// view refers to.
    pub fn node(&self) -> &NodeIndex {
        &self.node
    }

    /// Name associated with the reader associated with the view.
    pub fn name(&self) -> &Relation {
        &self.name
    }

    /// Returns a reference to the list of socket addresses for the view's shards
    #[must_use]
    pub fn shard_addrs(&self) -> &[SocketAddr] {
        self.shard_addrs.as_ref()
    }

    /// Returns the number of times this view is sharded
    #[must_use]
    pub fn num_shards(&self) -> usize {
        self.shard_addrs.len()
    }

    /// Get the current size of this view.
    ///
    /// Note that you must also continue to poll this `View` for the returned future to resolve.
    pub async fn len(&mut self) -> ReadySetResult<usize> {
        future::poll_fn(|cx| self.poll_ready(cx)).await?;

        let node = self.node;
        let name = self.name.clone();
        let mut rsps = self
            .shards
            .iter_mut()
            .enumerate()
            .map(|(shardi, shard)| {
                shard.call(Instrumented::from(Tagged::from(ReadQuery::Size {
                    target: ReaderAddress {
                        node,
                        name: name.clone(),
                        shard: shardi,
                    },
                })))
            })
            .collect::<FuturesUnordered<_>>();

        let mut nrows = 0;
        while let Some(reply) = rsps
            .next()
            .await
            .transpose()
            .map_err(rpc_err!("View::len"))?
        {
            if let ReadReply::Size(rows) = reply.v {
                nrows += rows;
            } else {
                unreachable!();
            }
        }

        Ok(nrows)
    }

    /// Get the placeholder to key column index mapping for the reader node
    /// Each pair represents a mapping from placeholder index to reader key column index
    pub fn key_map(&self) -> &[(ViewPlaceholder, KeyColumnIdx)] {
        &self.key_mapping
    }

    /// Get the current keys of this view. For debugging only.
    pub async fn keys(&mut self) -> ReadySetResult<Vec<Vec<DfValue>>> {
        future::poll_fn(|cx| self.poll_ready(cx)).await?;

        let node = self.node;
        let name = self.name.clone();
        let mut rsps = self
            .shards
            .iter_mut()
            .enumerate()
            .map(|(shardi, shard)| {
                shard.call(Instrumented::from(Tagged::from(ReadQuery::Keys {
                    target: ReaderAddress {
                        node,
                        name: name.clone(),
                        shard: shardi,
                    },
                })))
            })
            .collect::<FuturesUnordered<_>>();

        let mut vec = vec![];
        while let Some(reply) = rsps
            .next()
            .await
            .transpose()
            .map_err(rpc_err!("View::keys"))?
        {
            if let ReadReply::Keys(mut keys) = reply.v {
                vec.append(&mut keys);
            } else {
                unreachable!();
            }
        }

        Ok(vec)
    }

    /// Issue a raw `ViewQuery` against this view, and return the results.
    ///
    /// The method will block if the results are not yet available only when `block` is `true`.
    /// If `block` is false, misses will be returned as empty results. Any requested keys that have
    /// missing state will be backfilled (asynchronously if `block` is `false`).
    pub async fn raw_lookup(&mut self, query: ViewQuery) -> ReadySetResult<ResultIterator> {
        future::poll_fn(|cx| self.poll_ready(cx)).await?;
        match self.call(query).await? {
            LookupResult::NonBlockingMiss => Err(ReadySetError::ReaderMissingKey),
            LookupResult::Results(results, _) => Ok(ResultIterator::owned(results)),
        }
    }

    /// Retrieve the query results for the given parameter value.
    ///
    /// The method will block if the results are not yet available only when `block` is `true`.
    pub async fn lookup(&mut self, key: &[DfValue], block: bool) -> ReadySetResult<ResultIterator> {
        let key = Vec1::try_from_vec(key.into())
            .map_err(|_| view_err(self.node, ReadySetError::EmptyKey))?;
        self.multi_lookup(vec![KeyComparison::Equal(key)], block)
            .await
    }

    /// Retrieve the query results for the given parameter values.
    ///
    /// The method will block if the results are not yet available only when `block` is `true`.
    /// If `block` is false, misses will be returned as empty results. Any requested keys that have
    /// missing state will be backfilled (asynchronously if `block` is `false`).
    pub async fn multi_lookup(
        &mut self,
        key_comparisons: Vec<KeyComparison>,
        block: bool,
    ) -> ReadySetResult<ResultIterator> {
        self.raw_lookup((key_comparisons, block).into()).await
    }

    /// Build a [`ViewQuery`] for performing a lookup against this [`ReaderHandle`]
    #[allow(clippy::too_many_arguments)]
    fn build_view_query(
        &self,
        key_remap: Option<&HashMap<PlaceholderIdx, Literal>>,
        raw_keys: Vec<Cow<'_, [DfValue]>>,
        limit: Option<usize>,
        offset: Option<usize>,
        blocking_read: bool,
        dialect: Dialect,
    ) -> ReadySetResult<ViewQuery> {
        trace!("select::lookup");

        let (keys, filters) = if raw_keys.is_empty() {
            let bogo = vec![vec1![DfValue::from(0i32)].into()];
            (bogo, Vec::new())
        } else {
            let mut key_comparison_builder = KeyComparisonBuilder::new(self, key_remap, dialect)?;

            let keys = raw_keys
                .into_iter()
                .filter_map(|key| key_comparison_builder.build_key(key).transpose())
                .collect::<ReadySetResult<Vec<_>>>()?;

            (keys, key_comparison_builder.filters)
        };

        trace!(?keys, ?filters, "Built view query");

        Ok(ViewQuery {
            key_comparisons: keys,
            block: blocking_read,
            filter: filters.into_iter().reduce(|expr1, expr2| DfExpr::Op {
                left: Box::new(expr1),
                op: DfBinaryOperator::And,
                right: Box::new(expr2),
                ty: DfType::Bool, // AND is a boolean operator
            }),
            limit,
            offset,
        })
    }
}

struct KeyComparisonBuilder<'a> {
    mixed_binops: bool,
    filters: Vec<DfExpr>,
    key_map: &'a [(ViewPlaceholder, KeyColumnIdx)],
    key_types: HashMap<usize, &'a DfType>,
    key_remap: Option<&'a HashMap<PlaceholderIdx, Literal>>,
    binop_to_use: BinaryOperator,
    dialect: Dialect,
}

impl<'a> KeyComparisonBuilder<'a> {
    fn new(
        reader_handle: &'a ReaderHandle,
        key_remap: Option<&'a HashMap<PlaceholderIdx, Literal>>,
        dialect: Dialect,
    ) -> ReadySetResult<KeyComparisonBuilder<'a>> {
        let schema = reader_handle
            .schema()
            .ok_or_else(|| internal_err!("No schema for view"))?;

        let key_types = schema.col_types(
            reader_handle
                .key_map()
                .iter()
                .map(|(_, key_column_idx)| *key_column_idx),
            SchemaType::ProjectedSchema,
        )?;

        let mut current_binop = None;
        // Whether we have multiple different binary operators in our key comparisons
        let mixed_binops =
            !reader_handle
                .key_map()
                .iter()
                .all(|(placeholder, _)| match placeholder {
                    // Mixed binops if we see any two different binops in OneToOne placeholders
                    ViewPlaceholder::OneToOne(_, binop) => {
                        current_binop.get_or_insert(*binop) == binop
                    }
                    // Between uses mixed binops
                    ViewPlaceholder::Between(_, _) => false,
                    // Generated and PageNumber placeholders can be used
                    ViewPlaceholder::Generated | ViewPlaceholder::PageNumber { .. } => true,
                });
        // The binary operator we will use to build our key if we do not have a mixed comparison
        let binop_to_use = current_binop.unwrap_or(BinaryOperator::Equal);

        let key_types: HashMap<usize, &DfType> = reader_handle
            .key_map()
            .iter()
            .zip(key_types)
            .map(|((_, key_column_idx), key_type)| (*key_column_idx, key_type))
            .collect();

        Ok(Self {
            key_remap,
            dialect,
            mixed_binops,
            binop_to_use,
            key_types,
            key_map: reader_handle.key_map(),
            filters: Vec::new(),
        })
    }

    // parameter numbering is 1-based, but vecs are 0-based, so subtract 1
    // also, no from_ty since the key value is a literal
    fn remap_key(
        &self,
        key: &[DfValue],
        idx: &PlaceholderIdx,
        key_type: &DfType,
    ) -> ReadySetResult<DfValue> {
        let key = match self.key_remap {
            Some(remap) => {
                match remap.get(idx).ok_or_else(|| {
                    internal_err!("Key remapping for ReusedReaderHandle is missing indices")
                })? {
                    Literal::Placeholder(ItemPlaceholder::DollarNumber(idx)) => {
                        key.get(*idx as usize - 1).ok_or_else(|| {
                            internal_err!(
                                "Key remapping for ReusedReaderHandle contains erroneous index"
                            )
                        })?
                    }
                    Literal::Placeholder(_) => {
                        internal!(
                            "Key remapping for ReusedReaderHandle contains non-numbered placeholder"
                        )
                    }
                    literal => &DfValue::try_from_dialect(literal, self.dialect.into())?,
                }
            }
            None => key.get(*idx - 1).ok_or_else(|| {
                internal_err!(
                    "Key remapping for ReusedReaderHandle tries to access non-existent index"
                )
            })?,
        };
        key.coerce_for_comparison(key_type)
    }

    fn build_key(&mut self, raw_key: Cow<'_, [DfValue]>) -> ReadySetResult<Option<KeyComparison>> {
        let mut k = vec![];
        let mut bounds: Option<(Vec<DfValue>, Vec<DfValue>)> = if self.mixed_binops {
            Some((vec![], vec![]))
        } else {
            None
        };

        // All ViewPlaceholder indices must be remapped using key_remap
        for (view_placeholder, key_column_idx) in self.key_map {
            match view_placeholder {
                ViewPlaceholder::Generated => continue,
                ViewPlaceholder::OneToOne(idx, binop) => {
                    let key_type = *self
                        .key_types
                        .get(key_column_idx)
                        .ok_or_else(|| internal_err!("No key_type for key"))?;

                    let value = self.remap_key(raw_key.as_ref(), idx, key_type)?;

                    // Skip the key entirely if the value in the key is NULL, since we don't want
                    // to return any results for comparisons against NULL values
                    if value.is_none() {
                        return Ok(None);
                    }

                    let make_op = |(op, negated): (DfBinaryOperator, bool)| {
                        let op = DfExpr::Op {
                            left: Box::new(DfExpr::Column {
                                index: *key_column_idx,
                                ty: key_type.clone(),
                            }),
                            op,
                            right: Box::new(DfExpr::Literal {
                                val: value.clone(),
                                ty: key_type.clone(),
                            }),
                            ty: DfType::Bool, // TODO: infer type
                        };
                        if negated {
                            DfExpr::Not {
                                expr: Box::new(op),
                                ty: DfType::Bool,
                            }
                        } else {
                            op
                        }
                    };

                    if let Some((lower_bound, upper_bound)) = &mut bounds {
                        match binop {
                            BinaryOperator::Equal => {
                                lower_bound.push(value.clone());
                                upper_bound.push(value);
                            }
                            BinaryOperator::GreaterOrEqual => {
                                self.filters
                                    .push(make_op((DfBinaryOperator::GreaterOrEqual, false)));
                                lower_bound.push(value);
                                upper_bound.push(DfValue::Max);
                            }
                            BinaryOperator::LessOrEqual => {
                                self.filters
                                    .push(make_op((DfBinaryOperator::LessOrEqual, false)));
                                lower_bound.push(DfValue::None); // NULL is the minimum DfValue
                                upper_bound.push(value);
                            }
                            BinaryOperator::Greater => {
                                self.filters
                                    .push(make_op((DfBinaryOperator::Greater, false)));
                                lower_bound.push(value);
                                upper_bound.push(DfValue::Max);
                            }
                            BinaryOperator::Less => {
                                self.filters.push(make_op((DfBinaryOperator::Less, false)));
                                lower_bound.push(DfValue::None); // NULL is the minimum DfValue
                                upper_bound.push(value);
                            }
                            op => unsupported!("Unsupported binary operator in query: `{}`", op),
                        }
                    } else {
                        // We need to additionally filter post-lookup for certain
                        // compound ranges, since we
                        // always sort keys lexicographically within the
                        // reader map. This is the case for...
                        if (
                            // All keys within open (exclusive) ranges (consider eg:
                            //     (1, 2) > (1, 1)
                            //     even though
                            //     NOT (1 > 1 && 2 > 1)
                            // )
                            matches!(
                                        self.binop_to_use,
                                        BinaryOperator::Less | BinaryOperator::Greater
                                    )
                                        // As long as the range is actually compound
                                        && self.key_map.len() > 1
                        ) || (
                            // Or all other range keys beyond the *first* key within a
                            // compound range
                            self.binop_to_use != BinaryOperator::Equal && !k.is_empty()
                        ) {
                            self.filters.push(make_op(DfBinaryOperator::from_sql_op(
                                self.binop_to_use,
                                self.dialect,
                                key_type,
                                key_type,
                            )?));
                        }
                        k.push(value);
                    }
                }
                ViewPlaceholder::Between(lower_idx, upper_idx) => {
                    let key_type = self.key_types[key_column_idx];

                    let lower_value = self.remap_key(raw_key.as_ref(), lower_idx, key_type)?;
                    let upper_value = self.remap_key(raw_key.as_ref(), upper_idx, key_type)?;

                    // Skip the key entirely if the value in the key is NULL, since we don't want
                    // to return any results for comparisons against NULL values
                    if lower_value.is_none() || upper_value.is_none() {
                        return Ok(None);
                    }

                    let (lower_key, upper_key) = bounds.get_or_insert_with(Default::default);
                    lower_key.push(lower_value);
                    upper_key.push(upper_value);
                }
                ViewPlaceholder::PageNumber {
                    offset_placeholder,
                    limit,
                } => {
                    // offset parameters should always be a BigInt
                    let offset: u64 = self
                        .remap_key(raw_key.as_ref(), offset_placeholder, &DfType::BigInt)?
                        .try_into()?;
                    if !offset.is_multiple_of(*limit) {
                        unsupported!("OFFSET must currently be an integer multiple of LIMIT");
                    }
                    let page_number = offset / *limit;
                    k.push(page_number.into());
                }
            };
        }

        if let Some((lower, upper)) = bounds {
            debug_assert!(k.is_empty());
            Ok(Some(KeyComparison::Range((
                Bound::Included(lower.try_into()?),
                Bound::Included(upper.try_into()?),
            ))))
        } else {
            KeyComparison::from_key_and_operator(k, self.binop_to_use).map(Some)
        }
    }
}

impl ReusedReaderHandle {
    /// Get a reference to the reused [`ReaderHandle`].
    pub fn inner(&self) -> &ReaderHandle {
        &self.reader_handle
    }

    /// Get a mut reference to the reused [`ReaderHandle`].
    pub fn inner_mut(&mut self) -> &mut ReaderHandle {
        &mut self.reader_handle
    }

    /// Get the remapping of [`PlaceholderIdx`] in the cached query to [`Literal`]s in a query that
    /// is reusing this ReaderHandle.
    pub fn key_remapping(&self) -> &HashMap<PlaceholderIdx, Literal> {
        &self.key_remapping
    }

    /// Get the [`PlaceholderIdx`]s that correspond to inlined values in the cache. The values
    /// passed to these placeholders on execution must match the [`Literal`] values in this map.
    pub fn required_values(&self) -> &HashMap<PlaceholderIdx, Literal> {
        &self.required_values
    }

    /// Build a view query
    #[allow(clippy::too_many_arguments)]
    fn build_view_query(
        &self,
        raw_keys: Vec<Cow<'_, [DfValue]>>,
        limit: Option<usize>,
        offset: Option<usize>,
        blocking_read: bool,
        dialect: Dialect,
    ) -> ReadySetResult<Option<ViewQuery>> {
        // If any placeholders in our query correspond to inlined values in the migrated query,
        // verify that we are executing our query with these values.
        for params in raw_keys.iter() {
            for (idx, val) in &self.required_values {
                // Placeholders are 1-indexed, but params are 0-indexed
                let client_val = params.get(idx - 1).ok_or_else(|| {
                    internal_err!(
                        "Received fewer parameters than expected. Error indexing at position {}",
                        idx - 1
                    )
                })?;
                // Return None if keys do not satisfy the required values
                if DfValue::try_from_dialect(val, dialect.into())? != *client_val {
                    return Ok(None);
                }
            }
        }

        let raw_keys = if !raw_keys.is_empty() && self.required_values.len() == raw_keys[0].len() {
            vec![]
        } else {
            raw_keys
        };

        self.reader_handle
            .build_view_query(
                Some(&self.key_remapping),
                raw_keys,
                limit,
                offset,
                blocking_read,
                dialect,
            )
            .map(Some)
    }
}

impl View {
    /// Builds a [`ViewQuery`] that can be run against a [`ReaderHandle`]. If self is
    /// [`View::MultipleReused`], then we will find a [`ReaderHandle`] for the first Reader that can
    /// satisfy the given keys or return None. Also returns a reference to the [`ReaderHandle`] to
    /// indicate which handle the [`ViewQuery`] corresponds to.
    #[allow(clippy::too_many_arguments)]
    pub fn build_view_query(
        &mut self,
        raw_keys: Vec<Cow<'_, [DfValue]>>,
        limit: Option<usize>,
        offset: Option<usize>,
        blocking_read: bool,
        dialect: Dialect,
    ) -> ReadySetResult<Option<(&mut ReaderHandle, ViewQuery)>> {
        // If any placeholders in our query correspond to inlined values in the migrated query,
        // verify that we are executing our query with these values.
        match self {
            View::Single(handle) => handle
                .build_view_query(None, raw_keys, limit, offset, blocking_read, dialect)
                .map(|vq| Some((handle, vq))),
            View::MultipleReused(handles) => {
                let mut last_error = None;
                for reused_handle in handles {
                    match reused_handle.build_view_query(
                        raw_keys.clone(),
                        limit,
                        offset,
                        blocking_read,
                        dialect,
                    ) {
                        Ok(Some(vq)) => {
                            return Ok(Some((reused_handle.inner_mut(), vq)));
                        }
                        Err(err) => {
                            last_error = Some(err);
                        }
                        Ok(None) => {
                            /* Parameters did not satisfy inlined view so try another. */
                        }
                    }
                }

                if let Some(err) = last_error {
                    Err(err)
                } else {
                    Ok(None) // Parameters did not satisfy any inlined view
                }
            }
        }
    }

    /// Returns a single ReaderHandle if Self is [`View::Single`]
    pub fn into_reader_handle(self) -> Option<ReaderHandle> {
        match self {
            View::Single(rh) => Some(rh),
            View::MultipleReused(_) => None,
        }
    }

    /// Returns a mut reference to a single ReaderHandle if Self is [`View::Single`]
    pub fn as_mut_reader_handle(&mut self) -> Option<&mut ReaderHandle> {
        match self {
            View::Single(rh) => Some(rh),
            View::MultipleReused(_) => None,
        }
    }
}

#[derive(Debug, Default)]
#[repr(transparent)]
pub struct ReadReplyBatch(pub Vec<Vec<DfValue>>);

use serde::de::{self, DeserializeSeed, Deserializer, Visitor};
impl<'de> Deserialize<'de> for ReadReplyBatch {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        struct Elem;

        impl Visitor<'_> for Elem {
            type Value = Vec<Vec<DfValue>>;

            fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
                f.write_str("Vec<Vec<DfValue>>")
            }

            fn visit_bytes<E>(self, bytes: &[u8]) -> Result<Self::Value, E>
            where
                E: de::Error,
            {
                use bincode::Options;
                bincode::options()
                    .deserialize(bytes)
                    .map_err(de::Error::custom)
            }
        }

        impl<'de> DeserializeSeed<'de> for Elem {
            type Value = Vec<Vec<DfValue>>;

            fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
            where
                D: Deserializer<'de>,
            {
                deserializer.deserialize_bytes(self)
            }
        }

        deserializer.deserialize_bytes(Elem).map(ReadReplyBatch)
    }
}

impl From<ReadReplyBatch> for Vec<Vec<DfValue>> {
    fn from(val: ReadReplyBatch) -> Vec<Vec<DfValue>> {
        val.0
    }
}

impl From<Vec<Vec<DfValue>>> for ReadReplyBatch {
    fn from(v: Vec<Vec<DfValue>>) -> Self {
        Self(v)
    }
}

impl IntoIterator for ReadReplyBatch {
    type Item = Vec<DfValue>;
    type IntoIter = std::vec::IntoIter<Self::Item>;
    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

impl Extend<Vec<DfValue>> for ReadReplyBatch {
    fn extend<T>(&mut self, iter: T)
    where
        T: IntoIterator<Item = Vec<DfValue>>,
    {
        self.0.extend(iter)
    }
}

impl AsRef<[Vec<DfValue>]> for ReadReplyBatch {
    fn as_ref(&self) -> &[Vec<DfValue>] {
        &self.0[..]
    }
}

impl std::ops::Deref for ReadReplyBatch {
    type Target = Vec<Vec<DfValue>>;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl std::ops::DerefMut for ReadReplyBatch {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[allow(clippy::eq_op)]
    mod key_comparison {
        use readyset_util::eq_laws;

        use super::*;

        eq_laws!(KeyComparison);
    }

    mod build_view_query {
        use std::net::{IpAddr, Ipv4Addr};

        use dataflow_expression::Dialect as DfDialect;
        use readyset_sql::{ast::Column, Dialect};
        use vec1::vec1;

        use super::*;

        fn make_build_query(
            raw_keys: Vec<Cow<'_, [DfValue]>>,
            limit: Option<usize>,
            offset: Option<usize>,
            key_map: &[(ViewPlaceholder, KeyColumnIdx)],
            dialect: Dialect,
        ) -> ViewQuery {
            let schema = ViewSchema::new(
                vec![
                    ColumnSchema {
                        column: Column {
                            name: "x".into(),
                            table: Some("t".into()),
                        },
                        column_type: DfType::Int,
                        base: Some(ColumnBase {
                            table: "t".into(),
                            column: "x".into(),
                            constraints: vec![],
                            table_oid: None,
                            attnum: None,
                            sql_type: SqlType::Int(None),
                        }),
                    },
                    ColumnSchema {
                        column: Column {
                            name: "y".into(),
                            table: Some("t".into()),
                        },
                        column_type: DfType::DEFAULT_TEXT,
                        base: Some(ColumnBase {
                            table: "t".into(),
                            column: "y".into(),
                            constraints: vec![],
                            table_oid: None,
                            attnum: None,
                            sql_type: SqlType::Text,
                        }),
                    },
                ],
                vec![
                    ColumnSchema {
                        column: Column {
                            name: "x".into(),
                            table: Some("t".into()),
                        },
                        column_type: DfType::Int,
                        base: Some(ColumnBase {
                            table: "t".into(),
                            column: "x".into(),
                            constraints: vec![],
                            table_oid: None,
                            attnum: None,
                            sql_type: SqlType::Int(None),
                        }),
                    },
                    ColumnSchema {
                        column: Column {
                            name: "y".into(),
                            table: Some("t".into()),
                        },
                        column_type: DfType::DEFAULT_TEXT,
                        base: Some(ColumnBase {
                            table: "t".into(),
                            column: "y".into(),
                            constraints: vec![],
                            table_oid: None,
                            attnum: None,
                            sql_type: SqlType::Text,
                        }),
                    },
                ],
            );
            // Create a fake shard - will not be used for test. Worker is not spawned
            let (c, _) = Buffer::pair(
                Timeout::new(
                    ConcurrencyLimit::new(
                        Balance::new(make_views_discover(
                            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
                            Duration::new(0, 10),
                        )),
                        crate::PENDING_LIMIT,
                    ),
                    Duration::new(1, 0),
                ),
                crate::BUFFER_TO_POOL,
            );
            // Only the schema and key_mapping are used to build a ViewQuery
            let reader_handle = ReaderHandle {
                name: Relation::from("test"), // Not used for test
                node: NodeIndex::new(0),      // Not used for test
                columns: Arc::new([]),        // Not used for test
                schema: Some(schema),
                key_mapping: key_map.to_vec(),
                shards: Vec1::new(c), // Not used for test
                shard_addrs: vec![],  // Not used for test
            };
            let dataflow_dialect = match dialect {
                Dialect::MySQL => DfDialect::DEFAULT_MYSQL,
                Dialect::PostgreSQL => DfDialect::DEFAULT_POSTGRESQL,
            };
            let mut view = View::Single(reader_handle);
            view.build_view_query(raw_keys, limit, offset, true, dataflow_dialect)
                .unwrap()
                .unwrap()
                .1
        }

        #[test]
        fn simple_point_lookup() {
            // "SELECT t.x FROM t WHERE t.x = $1"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1)])],
                None,
                None,
                &[(ViewPlaceholder::OneToOne(1, BinaryOperator::Equal), 0)],
                Dialect::MySQL,
            );

            assert!(query.filter.is_none());
            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::from(vec1![DfValue::from(1)])]
            );
        }

        #[test]
        fn single_between() {
            // "SELECT t.x FROM t WHERE t.x BETWEEN $1 AND $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from(2)])],
                None,
                None,
                &[(ViewPlaceholder::Between(1, 2), 0)],
                Dialect::MySQL,
            );

            assert!(query.filter.is_none());
            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::from_range(
                    &(vec1![DfValue::from(1)]..=vec1![DfValue::from(2)])
                )]
            );
        }

        #[test]
        fn mixed_equal_and_inclusive() {
            // "SELECT t.x FROM t WHERE t.x >= $1 AND t.y = $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from("a")])],
                None,
                None,
                &[
                    (ViewPlaceholder::OneToOne(2, BinaryOperator::Equal), 1),
                    (
                        ViewPlaceholder::OneToOne(1, BinaryOperator::GreaterOrEqual),
                        0,
                    ),
                ],
                Dialect::MySQL,
            );

            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::from_range(
                    &(vec1![DfValue::from("a"), DfValue::from(1)]
                        ..=vec1![DfValue::from("a"), DfValue::Max])
                )]
            );
        }

        #[test]
        fn mixed_equal_and_between() {
            // "SELECT t.x FROM t WHERE t.x BETWEEN $1 AND $2 AND t.y = $3"
            let query = make_build_query(
                vec![Cow::Owned(vec![
                    DfValue::from(1),
                    DfValue::from(2),
                    DfValue::from("a"),
                ])],
                None,
                None,
                &[
                    (ViewPlaceholder::OneToOne(3, BinaryOperator::Equal), 1),
                    (ViewPlaceholder::Between(1, 2), 0),
                ],
                Dialect::MySQL,
            );

            assert!(query.filter.is_none());
            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::from_range(
                    &(vec1![DfValue::from("a"), DfValue::from(1)]
                        ..=vec1![DfValue::from("a"), DfValue::from(2)])
                )]
            );
        }

        #[test]
        fn mixed_equal_and_exclusive() {
            // "SELECT t.x FROM t WHERE t.x > $1 AND t.y = $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from("a")])],
                None,
                None,
                &[
                    (ViewPlaceholder::OneToOne(2, BinaryOperator::Equal), 1),
                    (ViewPlaceholder::OneToOne(1, BinaryOperator::Greater), 0),
                ],
                Dialect::MySQL,
            );

            assert_eq!(
                query.filter,
                Some(DfExpr::Op {
                    left: Box::new(DfExpr::Column {
                        index: 0,
                        ty: DfType::Int
                    }),
                    op: DfBinaryOperator::Greater,
                    right: Box::new(DfExpr::Literal {
                        val: 1.into(),
                        ty: DfType::Int
                    }),
                    ty: DfType::Bool,
                })
            );
            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::from_range(
                    &(vec1![DfValue::from("a"), DfValue::from(1)]
                        ..=vec1![DfValue::from("a"), DfValue::Max])
                )]
            );
        }

        #[test]
        fn compound_range_open() {
            // "SELECT t.x FROM t WHERE t.x > $1 AND t.y > $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from("a")])],
                None,
                None,
                &[
                    (ViewPlaceholder::OneToOne(1, BinaryOperator::Greater), 0),
                    (ViewPlaceholder::OneToOne(2, BinaryOperator::Greater), 1),
                ],
                Dialect::MySQL,
            );

            assert_eq!(
                query.filter,
                Some(DfExpr::Op {
                    left: Box::new(DfExpr::Op {
                        left: Box::new(DfExpr::Column {
                            index: 0,
                            ty: DfType::Int
                        }),
                        op: DfBinaryOperator::Greater,
                        right: Box::new(DfExpr::Literal {
                            val: 1.into(),
                            ty: DfType::Int
                        }),
                        ty: DfType::Bool
                    }),
                    op: DfBinaryOperator::And,
                    right: Box::new(DfExpr::Op {
                        left: Box::new(DfExpr::Column {
                            index: 1,
                            ty: DfType::DEFAULT_TEXT
                        }),
                        op: DfBinaryOperator::Greater,
                        right: Box::new(DfExpr::Literal {
                            val: "a".into(),
                            ty: DfType::DEFAULT_TEXT
                        }),
                        ty: DfType::Bool
                    }),
                    ty: DfType::Bool
                })
            );

            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::Range(
                    vec1![DfValue::from(1), DfValue::from("a")].range_from()
                )]
            );
        }

        #[test]
        fn compound_range_closed() {
            // "SELECT t.x FROM t WHERE t.x >= $1 AND t.y >= $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from("a")])],
                None,
                None,
                &[
                    (
                        ViewPlaceholder::OneToOne(1, BinaryOperator::GreaterOrEqual),
                        0,
                    ),
                    (
                        ViewPlaceholder::OneToOne(2, BinaryOperator::GreaterOrEqual),
                        1,
                    ),
                ],
                Dialect::MySQL,
            );

            assert_eq!(
                query.filter,
                Some(DfExpr::Op {
                    left: Box::new(DfExpr::Column {
                        index: 1,
                        ty: DfType::DEFAULT_TEXT
                    }),
                    op: DfBinaryOperator::GreaterOrEqual,
                    right: Box::new(DfExpr::Literal {
                        val: "a".into(),
                        ty: DfType::DEFAULT_TEXT
                    }),
                    ty: DfType::Bool
                })
            );

            assert_eq!(
                query.key_comparisons,
                vec![KeyComparison::Range(
                    vec1![DfValue::from(1), DfValue::from("a")].range_from_inclusive(),
                )]
            );
        }

        #[test]
        fn paginated_with_key() {
            // "SELECT t.x FROM t WHERE t.x = $1 ORDER BY t.y ASC LIMIT 3 OFFSET $2"
            let query = make_build_query(
                vec![Cow::Owned(vec![DfValue::from(1), DfValue::from(3)])],
                Some(3),
                Some(3),
                &[
                    (ViewPlaceholder::OneToOne(1, BinaryOperator::Equal), 0),
                    (
                        ViewPlaceholder::PageNumber {
                            offset_placeholder: 2,
                            limit: 3,
                        },
                        1,
                    ),
                ],
                Dialect::MySQL,
            );

            assert_eq!(query.filter, None);

            assert_eq!(
                query.key_comparisons,
                vec![vec1![
                    DfValue::from(1),
                    DfValue::from(1) // page 2
                ]
                .into()]
            );
        }
    }
}
